/* Copyright 2012 The Netty Project
 * 
 * The Netty Project licenses this file to you under the Apache License, version 2.0 (the "License"); you may not use
 * this file except in compliance with the License. You may obtain a copy of the License at:
 * 
 * http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
 * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
 * specific language governing permissions and limitations under the License. */
package io.netty.example.factorial;

import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

import java.math.BigInteger;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

/**
 * Handler for a client-side channel. This handler maintains stateful information which is specific to a certain channel
 * using member variables. Therefore, an instance of this handler can cover only one channel. You have to create a new
 * handler instance whenever you create a new channel and insert this handler to avoid a race condition.
 */
public class FactorialClientHandler extends SimpleChannelInboundHandler<BigInteger> {

	private ChannelHandlerContext ctx;
	private int receivedMessages;
	private int next = 1;
	final BlockingQueue<BigInteger> answer = new LinkedBlockingQueue<BigInteger>();

	public BigInteger getFactorial() {
		boolean interrupted = false;
		try {
			for (;;) {
				try {
					return answer.take();
				} catch (InterruptedException ignore) {
					interrupted = true;
				}
			}
		} finally {
			if (interrupted) {
				Thread.currentThread().interrupt();
			}
		}
	}

	@Override
	public void channelActive(ChannelHandlerContext ctx) {
		this.ctx = ctx;
		sendNumbers();
	}

	@Override
	public void channelRead0(ChannelHandlerContext ctx, final BigInteger msg) {
		receivedMessages++;
		if (receivedMessages == FactorialClient.COUNT) {
			// Offer the answer after closing the connection.
			ctx.channel().close().addListener(new ChannelFutureListener() {
				@Override
				public void operationComplete(ChannelFuture future) {
					boolean offered = answer.offer(msg);
					assert offered;
				}
			});
		}
	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
		cause.printStackTrace();
		ctx.close();
	}

	private void sendNumbers() {
		// Do not send more than 4096 numbers.
		ChannelFuture future = null;
		for (int i = 0; i < 4096 && next <= FactorialClient.COUNT; i++) {
			future = ctx.write(Integer.valueOf(next));
			next++;
		}
		if (next <= FactorialClient.COUNT) {
			assert future != null;
			future.addListener(numberSender);
		}
		ctx.flush();
	}

	private final ChannelFutureListener numberSender = new ChannelFutureListener() {
		@Override
		public void operationComplete(ChannelFuture future) throws Exception {
			if (future.isSuccess()) {
				sendNumbers();
			} else {
				future.cause().printStackTrace();
				future.channel().close();
			}
		}
	};
}
